package streaming;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import producer.KafkaRedisConfig;
import redis.clients.jedis.Jedis;
import scala.Tuple2;
import util.JavaRedisClient;

import java.util.*;

/**
 * Created by chs on 8/27/18.
 */
public class ClickStreamingAnalysis {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis");

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
        ssc.sparkContext().setLogLevel("WARN");
        // Kafka configurations
        String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,");
        System.out.println("Topics: " + Arrays.toString(topics));
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");

        // Create a direct stream 这里使用spark-streaming-kafka-0-8_2.11中的kafkautil
        JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, String.class,
                StringDecoder.class, StringDecoder.class,
                kafkaParams,
                new HashSet<String>(Arrays.asList(topics)));
        //接收到的数据格式为{"uid":"2016-10-04 12:22:30,1"} 创建json对象
        JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
            public JSONObject call(Tuple2<String, String> line) throws Exception {
                System.out.println("line:" +line._1()+":"+ line._2());
                JSONObject data = JSON.parseObject(line._2());
                return data;
            }
        });
        //取出pid 并map成(pid,1)的格式，然后聚合即可算出此批次该pid的点击次数
        JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() {
            public Tuple2<String, Long> call(JSONObject json) throws Exception {
//                System.out.println("clickUID:" + json.getString("uid"));
                return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
        final String clickHashKey = "pid::click";
        clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
            public void call(JavaPairRDD<String, Long> rdd) throws Exception {
                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
                    public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception {
                        //在foreachPartition中创建jdeis的连接可以减少连接
                        Jedis jedis = JavaRedisClient.get().getResource();
                        try{
                            while (tuple2Iterator.hasNext()){
                                Tuple2<String, Long> next = tuple2Iterator.next();
                                String pid = "click"+next._1();
                                Long clickCount = next._2();
                                jedis.hincrBy(clickHashKey, pid, clickCount);
                                System.out.println(pid+":"+clickCount);
                            }
                        }catch (Exception e){
                            System.out.println("error:"+e);
                        }
                        //用完一定要关了，不然连接池泄露程序就卡主了
                        jedis.close();
                    }
                });
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }
}
